#include "config.h"

#include <unistd.h>
#include <string.h>
#include <stdlib.h>
#include <stdio.h>
#include <getopt.h>
#include <dirent.h>
#include <ctype.h>

#define DBG_SUBSYS S_LIBINTERFACE

#include "configure.h"
#include "env.h"
#include "adt.h"
#include "net_table.h"
#include "lichstor.h"
#include "cluster.h"
#include "metadata.h"
#include "storage.h"
#include "volume.h"
#include "lichbd.h"
#include "lich_md.h"
#include "license.h"
#include "net_global.h"
#include "utils.h"
#include "dbg.h"
#include "lich_md.h"
#include "types.h"

#define COPY_THREAD_MAX 10
#define COPY_SIZE_MIN   (4*1024*1024)  /* 4M */
#define LICH_SPLIT_COUNT  40

typedef struct {
        int fd;
        bool thin;
        lichbd_ioctx_t fioctx;
        lichbd_ioctx_t tioctx;
        char *buf;
        char pool[MAX_NAME_LEN];
        void * (*worker)(void *_arg);

        fileid_t fid;
        fileid_t tid;
        uint64_t total_size;
} arg_ext_t;

typedef struct {
        sem_t sem;
        int ret;
        bool thin;
        arg_ext_t ext;
        off_t offset;
        size_t size;
} arg_t;

static int __copy_thread_create(arg_t * arg)
{
        int ret;
        pthread_t th;
        pthread_attr_t ta;

        ret = sem_init(&arg->sem, 0, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        (void) pthread_attr_init(&ta);
        (void) pthread_attr_setdetachstate(&ta, PTHREAD_CREATE_DETACHED);

        ret = pthread_create(&th, &ta, arg->ext.worker, arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

static int __copy_thread_startup(uint64_t _left, arg_ext_t *arg_ext)
{
        int ret, count, i, err = 0;
        uint64_t left, offset, size, avg_size;
        arg_t args[COPY_THREAD_MAX];

        count = 0;
        offset = 0;
        left = _left;
        avg_size = _ceil(left, COPY_THREAD_MAX);
        _align(&avg_size, NULL, COPY_SIZE_MIN);

        while (left > 0) {
                size = left < avg_size ? left : avg_size;
                args[count].offset = offset;
                args[count].size = size;
                args[count].ret = 0;
                args[count].thin = arg_ext->thin;
                args[count].ext = *arg_ext;
                args[count].ext.buf = arg_ext->buf + count * avg_size;

                ret = __copy_thread_create(&args[count]);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                count ++;
                offset += size;
                left -= size;
        }

        for (i = 0; i < count; i++) {
                ret = sem_wait(&args[i].sem);
                if (unlikely(ret))
                        err = ret;

                ret = args[i].ret;
                if (unlikely(ret))
                        err = ret;
        }

        if (err) {
                ret = err;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __stor_getattr(const char *pool, const char *path, fileid_t *fid, struct stat *stbuf)
{
        int ret, retry;

        retry = 0;
retry0:
        ret = stor_lookup1(pool, path, fid);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry0, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        retry = 0;
retry1:
        ret = stor_getattr(pool, fid, stbuf);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry1, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;

}

static int __stor_create(const char *pool, const char *path, fileid_t *fid, vol_param_t param)
{
        int ret, retry;
        fileid_t parentid;
        char name[MAX_NAME_LEN];
        struct stat stbuf;

        retry = 0;
retry0:
        ret = stor_splitpath(pool, path, &parentid, name);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry0, retry, 10, (1000 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        strcpy(param.name, name);

        retry = 0;
retry1:
        ret = stor_mkvol_with_area(fid, &parentid, &param);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry1, retry, 10, (1000 * 1000));
                } else if (ret == EEXIST) {
                        if (gloconf.testing) {
                                goto out;
                        }
                        
                        if (retry > 0) {
                                ret = stor_lookup(pool, &parentid, name, fid);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);

                                ret = stor_getattr(pool, fid, &stbuf);
                                if (unlikely(ret)) {
                                        GOTO(err_ret, ret);
                                }

                                if (stbuf.st_size) {
                                        ret = EEXIST;
                                        GOTO(err_ret, ret);
                                }
                        } else
                                GOTO(err_ret, ret);
                } else
                        GOTO(err_ret, ret);
        }

out:
        return 0;
err_ret:
        return ret;
}

static int __stor_truncate(const char *pool, const fileid_t *fileid, uint64_t size)
{
        int ret, retry = 0;
retry:
        ret = stor_truncate(pool, fileid, size);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __stor_connect(const char *pool, const char *path, lichbd_ioctx_t *ioctx)
{
        int ret, retry = 0;
retry:
        ret = lichbd_connect(pool, path, ioctx, 0);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

static int __copy_from_local_offset(const char *pool, const char *from, const char *to, int idx)
{
        int ret, fd, retry;
        struct stat fst, tst;
        char *buf;
        uint64_t left, offset, size;
        fileid_t fid;
        off_t _offset, _size;
        lichbd_ioctx_t ioctx;

        ret = stat(from, &fst);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        if (fst.st_size % LICH_IO_ALIGN) {
                fd = open(from, O_RDONLY);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        } else {
                fd = open(from, O_RDONLY | O_DIRECT);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        }

        ret = __stor_getattr(pool, to, &fid, &tst);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        _offset = idx * LICH_CHUNK_SPLIT;
        _size = LICH_CHUNK_SPLIT;

        if (tst.st_size < _offset + fst.st_size) {
                ret = __stor_truncate(pool, &fid, _offset + fst.st_size);
                if (unlikely(ret))
                        GOTO(err_fd, ret);
        }

        if (fst.st_size > _size) {
                ret = EINVAL;
                GOTO(err_fd, ret);
        } else if (fst.st_size < _size && idx != (int)(tst.st_size / LICH_CHUNK_SPLIT)) {
                ret = EINVAL;
                GOTO(err_fd, ret);
        }

        ret = __stor_connect(pool, to, &ioctx);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        ret = posix_memalign((void **)&buf, LICH_IO_ALIGN, BIG_BUF_LEN);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        offset = 0;
        left = fst.st_size;
        while (left > 0) {
                size = left < BIG_BUF_LEN ? left : BIG_BUF_LEN;

                ret = pread(fd, buf, size, offset);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_free, ret);
                }

                retry = 0;
        retry:
                ret = lichbd_pwrite1(&ioctx, buf, size, offset + _offset, O_DIRECT);
                if (unlikely(ret)) {
                        ret = _errno(ret);
                        if (ret == EAGAIN || ret == ENOSPC) {
                                USLEEP_RETRY(err_free, ret, retry, retry, 100, (100 * 1000));
                        } else
                                GOTO(err_free, ret);
                }

                offset += size;
                left -= size;
        }

        yfree((void **)&buf);
        close(fd);

        return 0;
err_free:
        yfree((void **)&buf);
err_fd:
        close(fd);
err_ret:
        return ret;
}

static int __copy_from_local_apply_check(const char *pool, const char *from, const char *to,
                off_t *_offset, size_t *_size)
{
        int ret, fd;
        char tmp[MAX_INFO_LEN];
        fileid_t fid;
        struct stat stbuf;

        fd = open(from, O_RDONLY);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = pread(fd, tmp, LICH_SNAPDIFF_BITMAP_OFF, 0);
        if (ret < 0) {
                ret = errno;
                GOTO(err_fd, ret);
        }

        if (memcmp(tmp, LICH_SNAPDIFF_MAGIC, strlen(LICH_SNAPDIFF_MAGIC))) {
                ret = EINVAL;
                GOTO(err_fd, ret);
        }

        *_offset = *(off_t *)(tmp + LICH_SNAPDIFF_OFFSET_OFF);
        *_size = *(size_t *)(tmp + LICH_SNAPDIFF_SIZE_OFF);

        ret = __stor_getattr(pool, to, &fid, &stbuf);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        if ((size_t)stbuf.st_size < *_offset + *_size) {
                ret = __stor_truncate(pool, &fid, *_offset + *_size);
                if (unlikely(ret))
                        GOTO(err_fd, ret);
        }

        close(fd);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

static int __copy_from_local_apply(const char *pool, const char *from, const char *to)
{
        int ret, fd, retry, chknum, i, count = 0;
        off_t _offset = 0;
        size_t _size = 0;
        char buf[BIG_BUF_LEN], *tmp;
        uint64_t left, offset, size, foffset;
        lichbd_ioctx_t ioctx;

        ret = __copy_from_local_apply_check(pool, from, to, &_offset, &_size);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        chknum = size2chknum(_size, NULL);
        ret = ymalloc((void **)&tmp, chknum);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        fd = open(from, O_RDONLY);
        if (fd < 0) {
                ret = errno;
                GOTO(err_free, ret);
        }

        ret = pread(fd, tmp, chknum, LICH_SNAPDIFF_BITMAP_OFF);
        if (ret < 0) {
                ret = errno;
                GOTO(err_fd, ret);
        }

        ret = __stor_connect(pool, to, &ioctx);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        for (i = 0; i < chknum; i++) {
                if (*(char *)(tmp + i) == 0)
                        continue;

                count++;

                offset = _offset + i * LICH_CHUNK_SPLIT;
                left = _size - i * LICH_CHUNK_SPLIT;
                left = left > LICH_CHUNK_SPLIT ? LICH_CHUNK_SPLIT : left;
                while (left > 0) {
                        size = left < BIG_BUF_LEN ? left : BIG_BUF_LEN;

                        foffset = LICH_SNAPDIFF_HEAD_LEN(chknum) +
                                (count - 1) * LICH_CHUNK_SPLIT +
                                offset % LICH_CHUNK_SPLIT;

                        ret = pread(fd, buf, size, foffset);
                        if (ret < 0) {
                                ret = errno;
                                GOTO(err_fd, ret);
                        }

                        retry = 0;
                retry:
                        ret = lichbd_pwrite1(&ioctx, buf, size, offset, 0);
                        if (unlikely(ret)) {
                                ret = _errno(ret);
                                if (ret == EAGAIN || ret == ENOSPC) {
                                        USLEEP_RETRY(err_fd, ret, retry, retry, 100, (100 * 1000));
                                } else
                                        GOTO(err_fd, ret);
                        }

                        offset += size;
                        left -= size;
                }
        }

        close(fd);
        yfree((void **)&tmp);

        return 0;
err_fd:
        close(fd);
err_free:
        yfree((void **)&tmp);
err_ret:
        return ret;
}

static int __copy_from_stdout(const char *pool, const char *to, bool thin, vol_param_t tparam)
{
        int ret, fd, retry;
        uint64_t offset, size, image_pos, buflen;
        char *buf;
        fileid_t fid;
        lichbd_ioctx_t ioctx;
        char zerobuf[LICH_CHUNK_SPLIT];

        (void) thin;
        memset(zerobuf, 0x00, LICH_CHUNK_SPLIT);
        ret = __stor_create(pool, to, &fid, tparam);
        if (ret)
                GOTO(err_ret, ret);

        ret = __stor_connect(pool, to, &ioctx);
        if (ret)
                GOTO(err_ret, ret);

        //buflen = LICH_IO_MAX * LICH_SPLIT_MAX;
        buflen = LICH_CHUNK_SPLIT;
        ret = ymalloc((void **)&buf, buflen);
        if (ret)
                GOTO(err_ret, ret);

        fd = 0;
        size = 0;
        offset = 0;
        image_pos = 0;
        while (1) {
                size = read(fd, buf + offset, buflen);
                if (ret < 0) {
                        ret = errno;
                        DERROR("read error, errno: %d\n", errno);
                        GOTO(err_free, ret);
                }

                offset += size;

                if (size && (size < buflen)) {
                        buflen -= size;
                        continue;
                }

                retry = 0;
        retry:
                if (memcmp(zerobuf, buf, offset) != 0) {
                        ret = lichbd_pwrite1(&ioctx, buf, offset, image_pos, 0);
                        if (ret) {
                                ret = _errno(ret);
                                if (ret == EAGAIN || ret == ENOSPC) {
                                        USLEEP_RETRY(err_free, ret, retry, retry, 100, (100 * 1000));
                                } else
                                        GOTO(err_free, ret);
                        }
                }

                image_pos += offset;

                if (size == 0) {
                        break;
                }

                offset = 0;
                //buflen = LICH_IO_MAX * LICH_SPLIT_MAX;
                buflen = LICH_CHUNK_SPLIT;
        }

        yfree((void**)&buf);
        return 0;
err_free:
        yfree((void**)&buf);
err_ret:
        return ret;
}

static void * __copy_from_local_worker(void *_arg)
{
        int ret, retry, size;
        arg_t *arg;
        char *buf, *zerobuf;
        uint64_t left, offset;

        arg = _arg;

        ret = ymalloc((void **)&zerobuf, LICH_CHUNK_SPLIT);
        if (ret)
                GOTO(err_ret, ret);
        memset(zerobuf, 0x00, LICH_CHUNK_SPLIT);

        ret = posix_memalign((void **)&buf, LICH_IO_ALIGN, LICH_CHUNK_SPLIT);
        if (unlikely(ret))
                GOTO(err_free, ret);

        offset = arg->offset;
        left = arg->size;
        while (left > 0) {
                size = min(left, LICH_CHUNK_SPLIT);

                ret = pread(arg->ext.fd, buf, size, offset);
                if (ret != size) {
                        if (ret < 0) {
                                ret = errno;
                        } else {
                                ret = EIO;
                        }

                        GOTO(err_free1, ret);
                }

                if (memcmp(zerobuf, buf, size) != 0) {
                        retry = 0;
retry:
                        ret = lichbd_pwrite1(&arg->ext.tioctx, buf, size, offset, O_DIRECT);
                        if (ret) {
                                ret = _errno(ret);
                                if (ret == EAGAIN || ret == ENOSPC) {
                                        USLEEP_RETRY(err_free1, ret, retry, retry, 100, (100 * 1000));
                                } else
                                        GOTO(err_free1, ret);
                        }
                }

                offset += size;
                left -= size;
        }

        YASSERT(left == 0);

        yfree((void **)&buf);
        yfree((void **)&zerobuf);
        arg->ret = 0;
        sem_post(&arg->sem);

        return NULL;
err_free1:
        yfree((void **)&buf);
err_free:
        yfree((void **)&zerobuf);
err_ret:
        arg->ret = ret;
        sem_post(&arg->sem);
        return NULL;
}

static int __copy_from_prep(const char *pool, const char *from, const char *to,
                            bool thin, vol_param_t tparam, arg_ext_t *arg_ext)
{
        int ret, fd;
        struct stat stbuf;
        fileid_t fid;
        lichbd_ioctx_t ioctx;

        ret = stat(from, &stbuf);
        if (ret < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        if (gloconf.testing == 0) {
            if (stbuf.st_size % LICH_BLOCK_SPLIT) {
                DERROR("The mirror's size must be %d bytes integer times.\n", LICH_BLOCK_SPLIT);
                ret = EPERM;
                GOTO(err_ret, ret);
            }
        }

        if (stbuf.st_size % LICH_IO_ALIGN) {
                fd = open(from, O_RDONLY);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        } else {
                fd = open(from, O_RDONLY | O_DIRECT);
                if (fd < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        }

        tparam.size = stbuf.st_size;
        ret = __stor_create(pool, to, &fid, tparam);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        /**
        ret = __stor_truncate(pool, &fid, stbuf.st_size);
        if (unlikely(ret))
                GOTO(err_fd, ret);
        **/

        ret = __stor_connect(pool, to, &ioctx);
        if (unlikely(ret))
                GOTO(err_fd, ret);

        arg_ext->fd = fd;
        arg_ext->tioctx = ioctx;
        arg_ext->worker = __copy_from_local_worker;
        arg_ext->fid = fid;
        arg_ext->total_size = stbuf.st_size;
        arg_ext->thin = thin;

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}


static int __copy_from_local_single(const char *pool, const char *from, const char *to, bool thin, vol_param_t tparam)
{
        int ret;
        arg_ext_t arg_ext;
        arg_t arg;

        ret = __copy_from_prep(pool, from, to, thin, tparam, &arg_ext);
        if (ret)
                GOTO(err_ret, ret);

        arg.offset = 0;
        arg.size = arg_ext.total_size;
        arg.ret = 0;
        arg.ext = arg_ext;
        ret = sem_init(&arg.sem, 0, 0);
        if (ret)
                GOTO(err_fd, ret);

        arg_ext.worker((void *)&arg);
        ret = arg.ret;
        if (arg.ret) {
                GOTO(err_fd, ret);
        }

        close(arg_ext.fd);

        return 0;
err_fd:
        close(arg_ext.fd);
err_ret:
        return ret;
}

static int __copy_from_local_multi(const char *pool, const char *from, const char *to, bool thin, vol_param_t tparam)
{
        int ret;
        arg_ext_t arg_ext;

        ret = __copy_from_prep(pool, from, to, thin, tparam, &arg_ext);
        if (ret)
                GOTO(err_ret, ret);

        ret = __copy_thread_startup(arg_ext.total_size, &arg_ext);
        if (ret)
                GOTO(err_fd, ret);

        close(arg_ext.fd);

        return 0;
err_fd:
        close(arg_ext.fd);
err_ret:
        return ret;
}

static int __copy_to_local_offset(const char *pool, const char *from, const char *to, int idx)
{
        int ret, fd, retry = 0;
        char buf[BIG_BUF_LEN + 1];
        struct stat stbuf;
        uint64_t left, offset, size;
        fileid_t fid;
        off_t _offset, _size;
        lichbd_ioctx_t ioctx;
        int localize = 0;

        ret = __stor_getattr(pool, from, &fid, &stbuf);
        if (ret)
                GOTO(err_ret, ret);

        _offset = idx * LICH_CHUNK_SPLIT;
        if (_offset >= stbuf.st_size) {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }
        _size = LICH_CHUNK_SPLIT;
        _size = _min(_size, stbuf.st_size - _offset);

        ret = __stor_connect(pool, from, &ioctx);
        if (ret)
                GOTO(err_ret, ret);

        fd = open(to, O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        offset = 0;
        left = _size;
        while (left > 0) {
                size = left < BIG_BUF_LEN ? left : BIG_BUF_LEN;

retry:
                ret = lichbd_pread1(&ioctx, buf, size, offset + _offset, localize);
                if (ret < 0) {
                        ret = -ret;
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_fd, ret, retry, retry, 100, (100 * 1000));
                        } else
                                GOTO(err_fd, ret);
                }

                YASSERT((uint64_t)ret == size);
                ret = pwrite(fd, buf, size, offset);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_fd, ret);
                }

                offset += size;
                left -= size;
        }

        close(fd);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;

}

static void * __copy_to_local_worker(void *_arg)
{
        int ret, retry, size;
        arg_t *arg;
        char buf[LICH_CHUNK_SPLIT];
        char *zerobuf;
        uint64_t begin, left, offset, i, chknum;
        int localize = 0;

        arg = _arg;

        ret = ymalloc((void **)&zerobuf, LICH_CHUNK_SPLIT);
        if (ret)
                GOTO(err_ret, ret);
        memset(zerobuf, 0x00, LICH_CHUNK_SPLIT);

        offset = arg->offset;
        left = arg->size;

        offset = offset - offset % LICH_CHUNK_SPLIT;
        begin = offset;
        chknum = size2chknum(arg->ext.total_size, NULL);

        for (i = 0; i < chknum; i++) {
                if (LICH_CHUNK_SPLIT * i < begin) {
                        continue;
                }

                size = min(left, LICH_CHUNK_SPLIT);

                // volume export, md5sum inconsistent
                /***
                fid2cid(&chkid, &arg->ext.fid, i);

                retry = 0;
retry1:
                ret = md_chunk_getinfo(arg->ext.pool, NULL, &chkid, chkinfo, NULL);
                if (ret) {
                        ret = _errno(ret);
                        if (ret == ENOENT || ret == ENOKEY) {
                                offset += size;
                                left -= size;
                                continue;
                        } else if (ret == EAGAIN){
                                USLEEP_RETRY(err_free, ret, retry1, retry, 100, (100 * 1000));
                        } else {
                                GOTO(err_free, ret);
                        }
                }**/

                retry = 0;
retry2:
                ret = lichbd_pread1(&arg->ext.fioctx, buf, size, offset, localize);
                if (ret < 0) {
                        ret = -ret;
                        ret = _errno(ret);
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_free, ret, retry2, retry, 100, (100 * 1000));
                        } else
                                GOTO(err_free, ret);
                }

                YASSERT(ret == size);

                if (memcmp(zerobuf, buf, size) != 0) {
                        ret = pwrite(arg->ext.fd, buf, size, offset);
                        if (ret != size) {
                                if (ret < 0) {
                                        ret = errno;
                                } else {
                                        ret = EIO;
                                }

                                GOTO(err_free, ret);
                        }
                }

                offset += size;
                left -= size;

                if (left <= 0)
                        break;
        }

        YASSERT(left == 0);

        yfree((void **)&zerobuf);
        arg->ret = 0;
        sem_post(&arg->sem);

        return NULL;
err_free:
        yfree((void **)&zerobuf);
err_ret:
        arg->ret = ret;
        sem_post(&arg->sem);
        return NULL;
}

static int __copy_to_prep(const char *pool, const char *from, const char *to, arg_ext_t *arg_ext)
{
        int ret, fd;
        struct stat stbuf;
        fileid_t fid;
        lichbd_ioctx_t ioctx;

        ret = __stor_getattr(pool, from, &fid, &stbuf);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __stor_connect(pool, from, &ioctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        fd = open(to, O_WRONLY | O_CREAT | O_TRUNC, 0644);
        if (fd < 0) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        ret = ftruncate(fd, stbuf.st_size);
        if (ret)
                GOTO(err_fd, ret);

        arg_ext->fd = fd;
        arg_ext->fioctx = ioctx;
        arg_ext->worker = __copy_to_local_worker;
        arg_ext->fid = fid;
        arg_ext->total_size = stbuf.st_size;
        strcpy(arg_ext->pool, pool);

        return 0;
err_fd:
        close(fd);
err_ret:
        return ret;
}

static int __copy_to_local_single(const char *pool, const char *from, const char *to)
{
        int ret;
        arg_ext_t arg_ext;
        arg_t arg;

        ret = __copy_to_prep(pool, from, to, &arg_ext);
        if (ret) {
                GOTO(err_ret, ret);
        }

        arg.offset = 0;
        arg.size = arg_ext.total_size;
        arg.ret = 0;
        arg.ext = arg_ext;
        ret = sem_init(&arg.sem, 0, 0);
        if (ret)
                GOTO(err_fd, ret);

        arg_ext.worker((void *)&arg);
        ret = arg.ret;
        if (arg.ret) {
                GOTO(err_fd, ret);
        }

        close(arg_ext.fd);

        return 0;
err_fd:
        close(arg_ext.fd);
err_ret:
        return ret;
}

static int __copy_to_local_multi(const char *pool, const char *from, const char *to)
{
        int ret;
        arg_ext_t arg_ext;

        ret = __copy_to_prep(pool, from, to, &arg_ext);
        if (ret) {
                GOTO(err_ret, ret);
        }

        ret = __copy_thread_startup(arg_ext.total_size, &arg_ext);
        if (ret)
                GOTO(err_fd, ret);

        close(arg_ext.fd);

        return 0;
err_fd:
        close(arg_ext.fd);
err_ret:
        return ret;
}

static int __copy_standard_single(const char *fpool, const char *from,
                const char *tpool, const char *to, vol_param_t tparam)
{
        int ret, retry = 0, size;
        struct stat stbuf;
        char buf[BIG_BUF_LEN];
        uint64_t left, offset;
        fileid_t fid, src;
        lichbd_ioctx_t fioctx, tioctx;
        int localize = 0;
        char *zerobuf;
        fileinfo_t fileinfo;

        retry = 0;
retry:
        ret = stor_lookup1(fpool, from, &src);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ret = md_getattr(&src, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __stor_connect(fpool, from, &fioctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (is_lsv(&fileinfo)) {
                tparam.size = fileinfo.logical_size;
                tparam.volume_format = VOLUME_FORMAT_LSV;
                tparam.repnum = fileinfo.repnum_usr;
                tparam.priority = fileinfo.priority;
        } else if (is_row2(&fileinfo)) {
                tparam.size = fileinfo.logical_size;
                tparam.volume_format = VOLUME_FORMAT_ROW2;
                tparam.repnum = fileinfo.repnum_usr;
                tparam.priority = fileinfo.priority;
        } else if (is_row3(&fileinfo)) {
                tparam.size = fileinfo.logical_size;
                tparam.volume_format = VOLUME_FORMAT_ROW3;
                tparam.repnum = fileinfo.repnum_usr;
                tparam.priority = fileinfo.priority;
        } else {
                tparam.size = fileinfo.size;
                tparam.volume_format = VOLUME_FORMAT_RAW;
                tparam.repnum = fileinfo.repnum_usr;
                tparam.priority = fileinfo.priority;
        }

        ret = __stor_create(tpool, to, &fid, tparam);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = __stor_getattr(tpool, to, &fid, &stbuf);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (stbuf.st_size < tparam.size) {
                                printf("Warning: The volume already exists, but the volume size is smaller than the required size.\n");
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }
                } else
                        GOTO(err_ret, ret);
        }

        ret = __stor_connect(tpool, to, &tioctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&zerobuf, LICH_CHUNK_SPLIT);
        if (ret)
                GOTO(err_ret, ret);

        memset(zerobuf, 0x00, LICH_CHUNK_SPLIT);

        offset = 0;
        left = tparam.size;
        while (left > 0) {
                size = left < BIG_BUF_LEN ? left : BIG_BUF_LEN;

                retry = 0;
retry1:
                ret = lichbd_pread1(&fioctx, buf, size, offset, localize);
                if (ret < 0) {
                        ret = -ret;
                        ret = _errno(ret);
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry1, retry, 100, (100 * 1000));
                        } else
                                GOTO(err_free, ret);
                }

                retry = 0;
retry2:
                if (memcmp(zerobuf, buf, size) != 0) {
                        ret = lichbd_pwrite1(&tioctx, buf, size, offset, 0);
                        if (unlikely(ret)) {
                                ret = _errno(ret);
                                if (ret == EAGAIN || ret == ENOSPC) {
                                        USLEEP_RETRY(err_free, ret, retry2, retry, 100, (100 * 1000));
                                } else
                                        GOTO(err_free, ret);
                        }
                }

                offset += size;
                left -= size;
        }

        yfree((void **)&zerobuf);
        return 0;
err_free:
        yfree((void **)&zerobuf);
err_ret:
        return ret;
}

static void * __copy_standard_worker(void *_arg)
{
        int ret, retry;
        arg_t *arg;
        char buf[LICH_CHUNK_SPLIT];
        uint64_t left, offset, size;
        int localize = 0;
        char *zerobuf;

        arg = _arg;

        ret = ymalloc((void **)&zerobuf, LICH_CHUNK_SPLIT);
        if (ret)
                GOTO(err_ret, ret);

        memset(zerobuf, 0x00, LICH_CHUNK_SPLIT);

        offset = arg->offset;
        left = arg->size;
        while (left > 0) {
                //size = left < BIG_BUF_LEN ? left : BIG_BUF_LEN;
                size = min(left, LICH_CHUNK_SPLIT);

                retry = 0;
retry1:
                ret = lichbd_pread1(&arg->ext.fioctx, buf, size, offset, localize);
                if (ret < 0) {
                        ret = -ret;
                        ret = _errno(ret);
                        if (ret == EAGAIN) {
                                USLEEP_RETRY(err_ret, ret, retry1, retry, 100, (100 * 1000));
                        } else
                                GOTO(err_ret, ret);
                }

                retry = 0;
retry2:
                if (memcmp(zerobuf, buf, size) != 0) {
                        ret = lichbd_pwrite1(&arg->ext.tioctx, buf, size, offset, 0);
                        if (unlikely(ret)) {
                                if (ret == EAGAIN) {
                                        USLEEP_RETRY(err_free, ret, retry2, retry, 100, (100 * 1000));
                                } else
                                        GOTO(err_free, ret);
                        }
                }

                offset += size;
                left -= size;
        }

        arg->ret = 0;
        sem_post(&arg->sem);

        yfree((void **)&zerobuf);
        return NULL;
err_free:
        yfree((void **)&zerobuf);
err_ret:
        arg->ret = ret;
        sem_post(&arg->sem);
        return NULL;
}

static int __copy_standard_multi(const char *fpool, const char *from,
                const char *tpool, const char *to, vol_param_t param)
{
        int ret, retry;
        struct stat stbuf;
        fileid_t fid, src;
        arg_ext_t arg_ext;
        lichbd_ioctx_t fioctx, tioctx;
        fileinfo_t fileinfo;

        retry = 0;
retry:
        ret = stor_lookup1(fpool, from, &src);
        if (unlikely(ret)) {
                if (ret == EAGAIN) {
                        USLEEP_RETRY(err_ret, ret, retry, retry, 50, (100 * 1000));
                } else
                        GOTO(err_ret, ret);
        }

        ret = md_getattr(&src, &fileinfo);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = __stor_connect(fpool, from, &fioctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (is_row2(&fileinfo) || is_lsv(&fileinfo)) {
                param.size = fileinfo.logical_size;
                param.volume_format = VOLUME_FORMAT_ROW2;
                param.repnum = fileinfo.repnum_usr;
                param.priority = fileinfo.priority;
        } else {
                param.size = fileinfo.size;
                param.volume_format = VOLUME_FORMAT_RAW;
                param.repnum = fileinfo.repnum_usr;
                param.priority = fileinfo.priority;
        }

        ret = __stor_create(tpool, to, &fid, param);
        if (unlikely(ret)) {
                if (ret == EEXIST) {
                        ret = __stor_getattr(tpool, to, &fid, &stbuf);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (stbuf.st_size < param.size) {
                                printf("Warning: The volume already exists, but the volume size is smaller than the required size.\n");
                                ret = EPERM;
                                GOTO(err_ret, ret);
                        }
                } else
                        GOTO(err_ret, ret);
        }

        ret = __stor_connect(tpool, to, &tioctx);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        arg_ext.fioctx = fioctx;
        arg_ext.tioctx = tioctx;
        arg_ext.worker = __copy_standard_worker;

        ret = __copy_thread_startup(param.size, &arg_ext);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int stor_copy_with_area(const char *fpool, const char *from, const char *tpool, const char *to,
                int multi, bool thin, int p, const char *site_name, int volume_format)
{
        vol_param_t tparam;

        if (!from || !to) {
                printf("src or dist can't be null\n");
                return EINVAL;
        }

        //printf("copy from %s to %s\n", from, to);

        vol_param_init(&tparam);
        tparam.priority = p;
        tparam.volume_format = volume_format;
        strcpy(tparam.site, site_name);

        if (from[0] == ':' && to[0] != ':') {
                if (from[1] == '-')
                        return __copy_from_stdout(tpool, to, thin, tparam);
                else if (multi)
                        return __copy_from_local_multi(tpool, from + 1, to, thin, tparam);
                else
                        return __copy_from_local_single(tpool, from + 1, to, thin, tparam);
        } else if (from[0] != ':' && to[0] == ':') {
                if (multi)
                        return __copy_to_local_multi(fpool, from, to + 1);
                else
                        return __copy_to_local_single(fpool, from, to + 1);
        } else if (from[0] != ':' && to[0] != ':') {
                if (multi)
                        return __copy_standard_multi(fpool, from, tpool, to, tparam);
                else
                        return __copy_standard_single(fpool, from, tpool, to, tparam);
        } else {
                UNIMPLEMENTED(__DUMP__);
        }

        return 0;
}

int stor_copy_offset(const char *pool, const char *from, const char *to, int idx)
{
        int ret;

        if (from[0] == ':' && to[0] != ':') {
                ret = __copy_from_local_offset(pool, from + 1, to, idx);
                if (ret)
                        GOTO(err_ret, ret);
        } else if (from[0] != ':' && to[0] == ':') {
                ret = __copy_to_local_offset(pool, from, to + 1, idx);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int stor_apply(const char *pool, const char *from, const char *to)
{
        int ret;

        printf("apply from %s to %s\n", from, to);

        if (from[0] == ':' && to[0] != ':') {
                ret = __copy_from_local_apply(pool, from + 1, to);
                if (ret)
                        GOTO(err_ret, ret);
        } else {
                ret = EINVAL;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

int stor_copy(const char *pool, const char *from, const char *to, int multi, bool thin, int p)
{
        int volume_format = VOLUME_FORMAT_ROW2; //????????????????????????????????????
        return stor_copy_with_area(pool, from, pool, to, multi, thin, p, "", volume_format);
}
